package com.gl.sass.mq.consumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.gl.sass.mq.message.BaseMessage;

import java.util.List;
import java.util.Set;

/**
 * 	基础消费者
 * @author xiehong
 *
 */			
@Component
public abstract class BaseConsumer implements MessageListenerConcurrently {
    private static final Logger log = LoggerFactory.getLogger(BaseConsumer.class);

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
     
        BaseMessage baseMessage = convert(list, consumeConcurrentlyContext);
        if (baseMessage.isMsgInValid()) {
            log.warn("msg1=消息已经超时,放弃消费,,message={},,tag={},,msgid={},, storeTimestamp={}", baseMessage.getFirstBody(),
                    baseMessage.getFirstTags(), baseMessage.getFirstMsgId(), baseMessage.getStoreTimestamp());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if (baseMessage.getReconsumeTimes() > 5) {
            log.warn("msg1=消息已经重试达到规定次数,放弃消费: times={},,tag={},,msgid={},,storeTimestamp={}", baseMessage.getReconsumeTimes(),
                    baseMessage.getFirstBody(), baseMessage.getFirstTags(), baseMessage.getFirstMsgId(),
                    baseMessage.getStoreTimestamp());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        log.info("msg1=MQ消息内容,,message={},,tag={},,msgid={},,storeTimestamp={},, reconsumeTimes={}",
                baseMessage.getFirstObject(String.class), baseMessage.getFirstTags(), baseMessage.getFirstMsgId(),
                baseMessage.getStoreTimestamp(), baseMessage.getReconsumeTimes());
        return consumeMessage(baseMessage);
    }

    public abstract ConsumeConcurrentlyStatus consumeMessage(BaseMessage baseMessage);

    public abstract Set<String> getTagSet();

    public BaseMessage convert(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        return new BaseMessage(list, consumeConcurrentlyContext);
    }

    
}
